package org.codehaus.activemq;

import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
import java.util.Iterator;
import javax.jms.Connection;
import javax.jms.ConnectionConsumer;
import javax.jms.ConnectionMetaData;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
import javax.jms.ServerSessionPool;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
import javax.jms.XAConnection;
import javax.management.j2ee.statistics.Stats;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.capacity.CapacityMonitorEvent;
import org.codehaus.activemq.capacity.CapacityMonitorEventListener;
import org.codehaus.activemq.management.JMSConnectionStatsImpl;
import org.codehaus.activemq.management.JMSStatsImpl;
import org.codehaus.activemq.management.StatsCapable;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.CapacityInfo;
import org.codehaus.activemq.message.ConnectionInfo;
import org.codehaus.activemq.message.ConsumerInfo;
import org.codehaus.activemq.message.Packet;
import org.codehaus.activemq.message.PacketListener;
import org.codehaus.activemq.message.ProducerInfo;
import org.codehaus.activemq.message.Receipt;
import org.codehaus.activemq.message.SessionInfo;
import org.codehaus.activemq.message.util.MemoryBoundedQueue;
import org.codehaus.activemq.message.util.MemoryBoundedQueueManager;
import org.codehaus.activemq.transport.TransportChannel;
import org.codehaus.activemq.transport.TransportStatusEvent;
import org.codehaus.activemq.transport.TransportStatusEventListener;
import org.codehaus.activemq.util.IdGenerator;

public class ActiveMQConnection
  implements Connection, PacketListener, ExceptionListener, TopicConnection, QueueConnection, StatsCapable, CapacityMonitorEventListener, TransportStatusEventListener
{
  private static final Log log = LogFactory.getLog(ActiveMQConnection.class);
  private static final int DEFAULT_CONNECTION_MEMORY_LIMIT = 10485760;
  private ActiveMQConnectionFactory factory;
  private String userName;
  private String password;
  protected String clientID;
  protected IdGenerator consumerIdGenerator;
  private IdGenerator clientIdGenerator;
  protected IdGenerator packetIdGenerator;
  private IdGenerator sessionIdGenerator;
  private TransportChannel transportChannel;
  private ExceptionListener exceptionListener;
  private CopyOnWriteArrayList sessions;
  private CopyOnWriteArrayList messageDispatchers;
  private CopyOnWriteArrayList connectionConsumers;
  private SynchronizedInt consumerNumberGenerator;
  private ActiveMQConnectionMetaData connectionMetaData;
  private SynchronizedBoolean closed;
  private SynchronizedBoolean started;
  private boolean clientIDSet;
  private boolean isConnectionInfoSentToBroker;
  private boolean isTransportOK;
  private long startTime;
  private ActiveMQPrefetchPolicy prefetchPolicy;
  private JMSConnectionStatsImpl stats;
  private JMSStatsImpl factoryStats;
  private MemoryBoundedQueueManager boundedQueueManager;
  private long flowControlSleepTime = 0L;
  public static final String DEFAULT_USER = "defaultUser";
  public static final String DEFAULT_URL = "tcp://localhost:61616";
  public static final String DEFAULT_PASSWORD = "defaultPassword";
  private boolean userSpecifiedClientID;

  public static ActiveMQConnection makeConnection()
    throws JMSException
  {
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
    return (ActiveMQConnection)factory.createConnection();
  }

  public static ActiveMQConnection makeConnection(String uri)
    throws JMSException
  {
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
    return (ActiveMQConnection)factory.createConnection();
  }

  public static ActiveMQConnection makeConnection(String user, String password, String uri)
    throws JMSException
  {
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user, password, uri);
    return (ActiveMQConnection)factory.createConnection();
  }

  public ActiveMQConnection(ActiveMQConnectionFactory factory, String theUserName, String thePassword, TransportChannel transportChannel)
    throws JMSException
  {
    this(factory, theUserName, thePassword);
    this.transportChannel = transportChannel;
    this.transportChannel.setPacketListener(this);
    this.transportChannel.setExceptionListener(this);
    this.transportChannel.addTransportStatusEventListener(this);
    this.transportChannel.start();
    this.isTransportOK = true;
  }

  protected ActiveMQConnection(ActiveMQConnectionFactory factory, String theUserName, String thePassword) {
    this.factory = factory;
    this.userName = theUserName;
    this.password = thePassword;
    this.clientIdGenerator = new IdGenerator();
    this.packetIdGenerator = new IdGenerator();
    this.consumerIdGenerator = new IdGenerator();
    this.sessionIdGenerator = new IdGenerator();
    this.consumerNumberGenerator = new SynchronizedInt(0);
    this.sessions = new CopyOnWriteArrayList();
    this.messageDispatchers = new CopyOnWriteArrayList();
    this.connectionConsumers = new CopyOnWriteArrayList();
    this.connectionMetaData = new ActiveMQConnectionMetaData();
    this.closed = new SynchronizedBoolean(false);
    this.started = new SynchronizedBoolean(false);
    this.startTime = System.currentTimeMillis();
    this.prefetchPolicy = new ActiveMQPrefetchPolicy();
    this.boundedQueueManager = new MemoryBoundedQueueManager(this.clientID, 10485760L);
    this.boundedQueueManager.addCapacityEventListener(this);
    boolean transactional = this instanceof XAConnection;
    this.factoryStats = factory.getFactoryStats();
    this.factoryStats.addConnection(this);
    this.stats = new JMSConnectionStatsImpl(this.sessions, transactional);
    factory.onConnectionCreate(this);
  }

  public Stats getStats()
  {
    return this.stats;
  }

  public JMSConnectionStatsImpl getConnectionStats()
  {
    return this.stats;
  }

  public Session createSession(boolean transacted, int acknowledgeMode)
    throws JMSException
  {
    checkClosed();
    ensureClientIDInitialised();
    return new ActiveMQSession(this, transacted ? 0 : acknowledgeMode);
  }

  public String getClientID()
    throws JMSException
  {
    checkClosed();
    return this.clientID;
  }

  public void setClientID(String newClientID)
    throws JMSException
  {
    if (this.clientIDSet) {
      throw new IllegalStateException("The clientID has already been set");
    }
    if (this.isConnectionInfoSentToBroker) {
      throw new IllegalStateException("Setting clientID on a used Connection is not allowed");
    }
    checkClosed();
    this.clientID = newClientID;
    this.userSpecifiedClientID = true;
  }

  public ConnectionMetaData getMetaData()
    throws JMSException
  {
    checkClosed();
    return this.connectionMetaData;
  }

  public ExceptionListener getExceptionListener()
    throws JMSException
  {
    checkClosed();
    return this.exceptionListener;
  }

  public void setExceptionListener(ExceptionListener listener)
    throws JMSException
  {
    checkClosed();
    this.exceptionListener = listener;
    this.transportChannel.setExceptionListener(listener);
  }

  public void start()
    throws JMSException
  {
    checkClosed();
    Iterator i;
    if (this.started.commit(false, true)) {
      sendConnectionInfoToBroker();
      this.transportChannel.start();
      for (i = this.sessions.iterator(); i.hasNext(); ) {
        ActiveMQSession s = (ActiveMQSession)i.next();
        s.start();
      }
    }
  }

  protected boolean isStarted()
  {
    return this.started.get();
  }

  public void stop()
    throws JMSException
  {
    checkClosed();
    if (this.started.commit(true, false)) {
      for (Iterator i = this.sessions.iterator(); i.hasNext(); ) {
        ActiveMQSession s = (ActiveMQSession)i.next();
        s.stop();
      }
      sendConnectionInfoToBroker(2000, this.closed.get());
      this.transportChannel.stop();
    }
  }

  public synchronized void close()
    throws JMSException
  {
    if (!this.closed.get()) {
      this.boundedQueueManager.removeCapacityEventListener(this);
      try {
        for (Iterator i = this.sessions.iterator(); i.hasNext(); ) {
          ActiveMQSession s = (ActiveMQSession)i.next();
          s.close();
        }
        for (Iterator i = this.connectionConsumers.iterator(); i.hasNext(); ) {
          ActiveMQConnectionConsumer c = (ActiveMQConnectionConsumer)i.next();
          c.close();
        }
        sendConnectionInfoToBroker(2000, true);
        this.connectionConsumers.clear();
        this.messageDispatchers.clear();
        this.transportChannel.stop();
      }
      finally {
        this.sessions.clear();
        this.started.set(false);
        this.factory.onConnectionClose(this);
      }
      this.closed.set(true);
    }
  }

  protected void checkClosed()
    throws IllegalStateException
  {
    if (this.closed.get())
      throw new IllegalStateException("The Connection is closed");
  }

  public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages)
    throws JMSException
  {
    checkClosed();
    ConsumerInfo info = new ConsumerInfo();
    info.setId(this.packetIdGenerator.generateId());
    info.setConsumerId(this.consumerIdGenerator.generateId());
    info.setDestination(ActiveMQMessageTransformation.transformDestination(destination));
    info.setSelector(messageSelector);
    return new ActiveMQConnectionConsumer(this, sessionPool, info, maxMessages);
  }

  public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages)
    throws JMSException
  {
    checkClosed();
    ConsumerInfo info = new ConsumerInfo();
    info.setId(this.packetIdGenerator.generateId());
    info.setConsumerId(this.consumerIdGenerator.generateId());
    info.setDestination(ActiveMQMessageTransformation.transformDestination(topic));
    info.setSelector(messageSelector);
    info.setConsumerName(subscriptionName);
    return new ActiveMQConnectionConsumer(this, sessionPool, info, maxMessages);
  }

  public ActiveMQPrefetchPolicy getPrefetchPolicy()
  {
    return this.prefetchPolicy;
  }

  public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy)
  {
    this.prefetchPolicy = prefetchPolicy;
  }

  public void consume(Packet packet)
  {
    if ((!this.closed.get()) && (packet != null))
      if (packet.isJMSMessage()) {
        ActiveMQMessage message = (ActiveMQMessage)packet;
        message.setReadOnly(true);
        message.setProducerID(this.clientID);
        try
        {
          int count = 0;
          for (Iterator i = this.messageDispatchers.iterator(); i.hasNext(); ) {
            ActiveMQMessageDispatcher dispatcher = (ActiveMQMessageDispatcher)i.next();
            if (dispatcher.isTarget(message)) {
              if (count > 0)
              {
                message = message.deepCopy();
              }
              dispatcher.dispatch(message);
              count++;
            }
          }
        }
        catch (JMSException jmsEx)
        {
          int count;
          Iterator i;
          handleAsyncException(jmsEx);
        }
      }
      else if (packet.getPacketType() == 27) {
        CapacityInfo info = (CapacityInfo)packet;
        this.flowControlSleepTime = info.getFlowControlTimeout();
      }
  }

  public void onException(JMSException jmsEx)
  {
    handleAsyncException(jmsEx);
    this.isTransportOK = false;
    try {
      close();
    }
    catch (JMSException ex) {
      log.warn("Got an exception closing the connection", ex);
    }
  }

  public TopicSession createTopicSession(boolean transacted, int acknowledgeMode)
    throws JMSException
  {
    checkClosed();
    return new ActiveMQSession(this, transacted ? 0 : acknowledgeMode);
  }

  public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages)
    throws JMSException
  {
    checkClosed();
    ConsumerInfo info = new ConsumerInfo();
    info.setId(this.packetIdGenerator.generateId());
    info.setConsumerId(this.consumerIdGenerator.generateId());
    info.setDestination(ActiveMQMessageTransformation.transformDestination(topic));
    info.setSelector(messageSelector);
    return new ActiveMQConnectionConsumer(this, sessionPool, info, maxMessages);
  }

  public QueueSession createQueueSession(boolean transacted, int acknowledgeMode)
    throws JMSException
  {
    checkClosed();
    return new ActiveMQSession(this, transacted ? 0 : acknowledgeMode);
  }

  public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages)
    throws JMSException
  {
    checkClosed();
    ConsumerInfo info = new ConsumerInfo();
    info.setId(this.packetIdGenerator.generateId());
    info.setConsumerId(this.consumerIdGenerator.generateId());
    info.setDestination(ActiveMQMessageTransformation.transformDestination(queue));
    info.setSelector(messageSelector);
    return new ActiveMQConnectionConsumer(this, sessionPool, info, maxMessages);
  }

  public void checkClientIDWasManuallySpecified()
    throws JMSException
  {
    if (!this.userSpecifiedClientID)
      throw new JMSException("You cannot create a durable subscriber without specifying a unique clientID on a Connection");
  }

  public void statusChanged(TransportStatusEvent event)
  {
    log.info("channel status changed: " + event);
    if (event.getChannelStatus() == 3)
      doReconnect();
  }

  void asyncSendPacket(Packet packet)
    throws JMSException
  {
    if ((this.isTransportOK) && (!this.closed.get())) {
      packet.setReceiptRequired(false);
      if ((packet.isJMSMessage()) && (this.flowControlSleepTime > 0L))
        try {
          Thread.sleep(this.flowControlSleepTime);
        }
        catch (InterruptedException e)
        {
        }
      this.transportChannel.asyncSend(packet);
    }
  }

  void syncSendPacket(Packet packet) throws JMSException {
    syncSendPacket(packet, 0);
  }

  void syncSendPacket(Packet packet, int timeout) throws JMSException {
    if ((this.isTransportOK) && (!this.closed.get()))
    {
      packet.setReceiptRequired(true);
      Receipt receipt = this.transportChannel.send(packet, timeout);
      if ((receipt != null) && 
        (receipt.isFailed())) {
        Throwable e = receipt.getException();
        if (e != null) {
          throw ((JMSException)new JMSException(e.getMessage()).initCause(e));
        }
        throw new JMSException("syncSendPacket failed with unknown exception");
      }
    }
    else
    {
      throw new JMSException("syncSendTimedOut");
    }
  }

  Receipt syncSendRequest(Packet packet) throws JMSException {
    if ((this.isTransportOK) && (!this.closed.get()))
    {
      packet.setReceiptRequired(true);
      Receipt receipt = this.transportChannel.send(packet);
      if (receipt.isFailed()) {
        Throwable e = receipt.getException();
        if (e != null) {
          throw ((JMSException)new JMSException(e.getMessage()).initCause(e));
        }
        throw new JMSException("syncSendPacket failed with unknown exception");
      }
      return receipt;
    }

    throw new JMSException("Connection closed.");
  }

  protected void addSession(ActiveMQSession session)
    throws JMSException
  {
    this.sessions.add(session);
    addMessageDispatcher(session);
    SessionInfo info = createSessionInfo(session);
    info.setStarted(true);
    asyncSendPacket(info);
  }

  protected void removeSession(ActiveMQSession session)
    throws JMSException
  {
    this.sessions.remove(session);
    removeMessageDispatcher(session);
    SessionInfo info = createSessionInfo(session);
    info.setStarted(false);
    asyncSendPacket(info);
  }

  private SessionInfo createSessionInfo(ActiveMQSession session) {
    SessionInfo info = new SessionInfo();
    info.setId(this.packetIdGenerator.generateId());
    info.setClientId(this.clientID);
    info.setSessionId(session.getSessionId());
    info.setStartTime(session.getStartTime());
    return info;
  }

  protected void addConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer)
    throws JMSException
  {
    this.connectionConsumers.add(connectionConsumer);
    addMessageDispatcher(connectionConsumer);
  }

  protected void removeConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer)
  {
    this.connectionConsumers.add(connectionConsumer);
    removeMessageDispatcher(connectionConsumer);
  }

  protected void addMessageDispatcher(ActiveMQMessageDispatcher messageDispatch)
    throws JMSException
  {
    this.messageDispatchers.add(messageDispatch);
  }

  protected void removeMessageDispatcher(ActiveMQMessageDispatcher messageDispatcher)
  {
    this.messageDispatchers.remove(messageDispatcher);
  }

  protected void handleAsyncException(JMSException jmsEx)
  {
    if (this.exceptionListener != null) {
      this.exceptionListener.onException(jmsEx);
    }
    else
      log.warn("async exception with no exception listener", jmsEx);
  }

  private void sendConnectionInfoToBroker() throws JMSException
  {
    sendConnectionInfoToBroker(0, this.closed.get());
  }

  private void sendConnectionInfoToBroker(int timeout, boolean isClosed)
    throws JMSException
  {
    if (!this.isConnectionInfoSentToBroker) {
      this.transportChannel.setClientID(this.clientID);
    }
    this.isConnectionInfoSentToBroker = true;
    ensureClientIDInitialised();
    ConnectionInfo info = new ConnectionInfo();
    info.setClientId(this.clientID);
    info.setHostName(IdGenerator.getHostName());
    info.setUserName(this.userName);
    info.setPassword(this.password);
    info.setId(this.packetIdGenerator.generateId());
    info.setStartTime(this.startTime);
    info.setStarted(this.started.get());
    info.setClosed(isClosed);
    syncSendPacket(info, timeout);
  }

  public void setConnectionMemoryLimit(int newMemoryLimit)
  {
    this.boundedQueueManager.setValueLimit(newMemoryLimit);
  }

  public int getConnectionMemoryLimit()
  {
    return (int)this.boundedQueueManager.getValueLimit();
  }

  public void capacityChanged(CapacityMonitorEvent event)
  {
    CapacityInfo info = new CapacityInfo();
    info.setId(this.packetIdGenerator.generateId());
    info.setResourceName(event.getMonitorName());
    info.setCapacity(event.getCapacity());
    try
    {
      asyncSendPacket(info);
    }
    catch (JMSException e) {
      JMSException jmsEx = new JMSException("failed to send change in capacity");
      jmsEx.setLinkedException(e);
      handleAsyncException(jmsEx);
    }
  }

  protected int getNextConsumerNumber()
  {
    return this.consumerNumberGenerator.increment();
  }

  protected String generateSessionId() {
    return this.sessionIdGenerator.generateId();
  }

  protected void ensureClientIDInitialised() {
    if (this.clientID == null) {
      this.clientID = this.clientIdGenerator.generateId();
    }
    this.clientIDSet = true;
  }

  protected MemoryBoundedQueue getMemoryBoundedQueue(String name) {
    return this.boundedQueueManager.getMemoryBoundedQueue(name);
  }

  protected void doReconnect()
  {
    try {
      sendConnectionInfoToBroker();
      for (Iterator iter = this.sessions.iterator(); iter.hasNext(); ) {
    	  ActiveMQSession session = (ActiveMQSession)iter.next();
        SessionInfo sessionInfo = createSessionInfo(session);
        sessionInfo.setStarted(true);
        asyncSendPacket(sessionInfo);

        for (Iterator consumersIterator = session.consumers.iterator(); consumersIterator.hasNext(); ) {
          ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)consumersIterator.next();
          ConsumerInfo consumerInfo = session.createConsumerInfo(consumer);
          consumerInfo.setStarted(true);
          syncSendPacket(consumerInfo);
        }

        for (Iterator producersIterator = session.producers.iterator(); producersIterator.hasNext(); ) {
          ActiveMQMessageProducer producer = (ActiveMQMessageProducer)producersIterator.next();
          ProducerInfo producerInfo = session.createProducerInfo(producer);
          producerInfo.setStarted(true);
          syncSendPacket(producerInfo);
        }
      }
    }
    catch (JMSException jmsEx)
    {
      Iterator iter;
      ActiveMQSession session;
      Iterator producersIterator;
      log.error("Failed to do reconnection");
      handleAsyncException(jmsEx);
      this.isTransportOK = false;
    }
  }

@Override
public Session createSession() throws JMSException {
	// TODO Auto-generated method stub
	return null;
}

@Override
public Session createSession(int arg0) throws JMSException {
	// TODO Auto-generated method stub
	return null;
}

@Override
public ConnectionConsumer createSharedConnectionConsumer(Topic arg0, String arg1, String arg2, ServerSessionPool arg3,
		int arg4) throws JMSException {
	// TODO Auto-generated method stub
	return null;
}

@Override
public ConnectionConsumer createSharedDurableConnectionConsumer(Topic arg0, String arg1, String arg2,
		ServerSessionPool arg3, int arg4) throws JMSException {
	// TODO Auto-generated method stub
	return null;
}
}